
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import javax.annotation.Nullable;

import java.sql.Timestamp;
/**
 * @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],获取更多精彩实战内容
 * <p>
 * 流式数据以sql的形式写入hive
 */
public class Hive_Write {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        bsEnv.enableCheckpointing(10000);
//        bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
        DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
                .assignTimestampsAndWatermarks(
                        new AssignerWithPunctuatedWatermarks<UserInfo>() {
                            long water = 0l;

                            @Nullable
                            @Override
                            public Watermark checkAndGetNextWatermark(
                                    UserInfo lastElement,
                                    long extractedTimestamp) {
                                return new Watermark(water);
                            }

                            @Override
                            public long extractTimestamp(
                                    UserInfo element,
                                    long recordTimestamp) {
                                water = element.getTs().getTime();
                                return water;
                            }
                        });

//----------------------------------------------------------------------------------------------
        //构造hive catalog
        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/home/appleyuchi/bigdata/apache-hive-3.0.0-bin/conf"; // a local path
        String version = "3.1.2";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.useDatabase("db1");

        tEnv.createTemporaryView("users", dataStream);

//      如果hive中已经存在了相应的表，则这段代码省略
//		String hiveSql = "CREATE external TABLE fs_table (\n" +
//		                 "  user_id STRING,\n" +
//		                 "  order_amount DOUBLE" +
//		                 ") partitioned by (dt string,h string,m string) " +
//		                 "stored as ORC " +
//		                 "TBLPROPERTIES (\n" +
//		                 "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
//		                 "  'sink.partition-commit.delay'='0s',\n" +
//		                 "  'sink.partition-commit.trigger'='partition-time',\n" +
//		                 "  'sink.partition-commit.policy.kind'='metastore'" +
//		                 ")";
//		tEnv.executeSql(hiveSql);

        String insertSql = "insert into  fs_table SELECT userId, amount, " +
                " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
        tEnv.executeSql(insertSql);

//        env.execute();
    }

}